{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import shutil\n",
    "import tensorflow as tf\n",
    "import tensorflow_data_validation as tfdv\n",
    "import tensorflow_model_analysis as tfma\n",
    "from google.protobuf import text_format \n",
    "from tensorflow.python.lib.io import file_io\n",
    "from tensorflow_transform.beam.tft_beam_io import transform_fn_io\n",
    "from tensorflow_transform.coders import example_proto_coder\n",
    "from tensorflow_transform.saved import saved_transform_io\n",
    "from tensorflow_transform.tf_metadata import dataset_schema\n",
    "from tensorflow_transform.tf_metadata import schema_utils\n",
    "from tfx.examples.chicago_taxi.trainer import task\n",
    "from tfx.examples.chicago_taxi.trainer import taxi\n",
    "import tensorflow_metadata as tfm\n",
    "\n",
    "from  hops import hdfs as hopsfs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "BASE_DIR = hopsfs.project_path(exclude_nn_addr=True)\n",
    "DATA_DIR = os.path.join(BASE_DIR, 'Resources/data')\n",
    "OUTPUT_DIR = os.path.join(BASE_DIR, 'Resources/taxi_out')\n",
    "TMP_DIR = os.path.join(BASE_DIR, 'Resources/taxi_tmp')\n",
    "\n",
    "# Base dir containing train and eval data\n",
    "TRAIN_DATA_DIR = os.path.join(DATA_DIR, 'train')\n",
    "EVAL_DATA_DIR = os.path.join(DATA_DIR, 'eval')\n",
    "\n",
    "# Base dir where TFT writes training data\n",
    "TFT_TRAIN_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tft_train')\n",
    "TFT_TRAIN_FILE_PREFIX = 'train_transformed'\n",
    "\n",
    "# Base dir where TFT writes eval data\n",
    "TFT_EVAL_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tft_eval')\n",
    "TFT_EVAL_FILE_PREFIX = 'eval_transformed'\n",
    "\n",
    "TF_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tf')\n",
    "\n",
    "# Base dir where TFMA writes eval data\n",
    "TFMA_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tfma')\n",
    "\n",
    "SERVING_MODEL_DIR = 'serving_model_dir'\n",
    "EVAL_MODEL_DIR = 'eval_model_dir'\n",
    "\n",
    "\n",
    "def get_tft_train_output_dir(run_id):\n",
    "    return _get_output_dir(TFT_TRAIN_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "\n",
    "def get_tft_eval_output_dir(run_id):\n",
    "    return _get_output_dir(TFT_EVAL_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "\n",
    "def get_tf_output_dir(run_id):\n",
    "    return _get_output_dir(TF_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "def get_tfma_output_dir(run_id):\n",
    "    return _get_output_dir(TFMA_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "def _get_output_dir(base_dir, run_id):\n",
    "    return os.path.join(base_dir, 'run_' + str(run_id))\n",
    "\n",
    "def get_schema_file():\n",
    "    return os.path.join(OUTPUT_DIR, 'schema.pbtxt')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prepare the Model\n",
    "\n",
    "To use TFMA, export the model into an **EvalSavedModel** by calling ``tfma.export.export_eval_savedmodel``.\n",
    "\n",
    "``tfma.export.export_eval_savedmodel`` is analogous to ``estimator.export_savedmodel`` but exports the evaluation graph as opposed to the training or inference graph. Notice that one of the inputs is ``eval_input_receiver_fn`` which is analogous to ``serving_input_receiver_fn`` for ``estimator.export_savedmodel``. For more details, refer to the documentation for TFMA on Github.\n",
    "\n",
    "Contruct the **EvalSavedModel** after training is completed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def run_experiment(hparams):\n",
    "    \"\"\"Run the training and evaluate using the high level API\"\"\"\n",
    "\n",
    "    # Train and evaluate the model as usual.\n",
    "    estimator = task.train_and_maybe_evaluate(hparams)\n",
    "\n",
    "    # Export TFMA's sepcial EvalSavedModel\n",
    "    eval_model_dir = os.path.join(hparams.output_dir, EVAL_MODEL_DIR)\n",
    "    from hops import tensorboard\n",
    "    tensorboard.logdir = eval_model_dir\n",
    "    receiver_fn = lambda: eval_input_receiver_fn(hparams.tf_transform_dir)\n",
    "\n",
    "    tfma.export.export_eval_savedmodel(\n",
    "        estimator=estimator,\n",
    "        export_dir_base=eval_model_dir,\n",
    "        eval_input_receiver_fn=receiver_fn)\n",
    "    \n",
    "def eval_input_receiver_fn(working_dir):\n",
    "    schema = tfm.proto.v0.schema_pb2.Schema()\n",
    "    schema_text = file_io.read_file_to_string(get_schema_file())\n",
    "    text_format.Parse(schema_text, schema)\n",
    "    # Extract feature spec from the schema.\n",
    "    raw_feature_spec = schema_utils.schema_as_feature_spec(schema).feature_spec\n",
    "\n",
    "    serialized_tf_example = tf.placeholder(\n",
    "        dtype=tf.string, shape=[None], name='input_example_tensor')\n",
    "\n",
    "    # First we deserialize our examples using the raw schema.\n",
    "    features = tf.parse_example(serialized_tf_example, raw_feature_spec)\n",
    "\n",
    "    # Now that we have our raw examples, we must process them through tft\n",
    "    _, transformed_features = (\n",
    "        saved_transform_io.partially_apply_saved_transform(\n",
    "            os.path.join(working_dir, transform_fn_io.TRANSFORM_FN_DIR),\n",
    "            features))\n",
    "\n",
    "    # The key MUST be 'examples'.\n",
    "    receiver_tensors = {'examples': serialized_tf_example}\n",
    "    \n",
    "    # NOTE: Model is driven by transformed features (since training works on the\n",
    "    # materialized output of TFT, but slicing will happen on raw features.\n",
    "    features.update(transformed_features)\n",
    "    \n",
    "    return tfma.export.EvalInputReceiver(\n",
    "        features=features,\n",
    "        receiver_tensors=receiver_tensors,\n",
    "        labels=transformed_features[taxi.transformed_name(taxi.LABEL_KEY)])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train and export the model for TFMA\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def run_local_experiment(tft_run_id=0, tf_run_id=0, num_layers=4, first_layer_size=100, scale_factor=0.7):\n",
    "    \"\"\"Helper method to train and export the model for TFMA\n",
    "    \n",
    "    The caller specifies the input and output directory by providing run ids. The optional parameters\n",
    "    allows the user to change the modelfor time series view.\n",
    "    \n",
    "    Args:\n",
    "      tft_run_id: The run id for the preprocessing. Identifies the folder containing training data.\n",
    "      tf_run_id: The run for this training run. Identify where the exported model will be written to.\n",
    "      num_layers: The number of layers used by the hiden layer.\n",
    "      first_layer_size: The size of the first hidden layer.\n",
    "      scale_factor: The scale factor between each layer in in hidden layers.\n",
    "    \"\"\"\n",
    "    hparams = tf.contrib.training.HParams(\n",
    "        # Inputs: are tf-transformed materialized features\n",
    "        train_files=os.path.join(get_tft_train_output_dir(tft_run_id), TFT_TRAIN_FILE_PREFIX + '-00000-of-*'),\n",
    "        eval_files=os.path.join(get_tft_eval_output_dir(tft_run_id), TFT_EVAL_FILE_PREFIX + '-00000-of-*'),\n",
    "        schema_file=get_schema_file(),\n",
    "        # Output: dir for trained model\n",
    "        job_dir=get_tf_output_dir(tf_run_id),\n",
    "        tf_transform_dir=get_tft_train_output_dir(tft_run_id),\n",
    "        \n",
    "        # Output: dir for both the serving model and eval_model which will go into tfma\n",
    "        # evaluation\n",
    "        output_dir=get_tf_output_dir(tf_run_id), #get_experiments_dir()\n",
    "        train_steps=10000,\n",
    "        eval_steps=5000,\n",
    "        num_layers=num_layers,\n",
    "        first_layer_size=first_layer_size,\n",
    "        scale_factor=scale_factor,\n",
    "        num_epochs=None,\n",
    "        train_batch_size=40,\n",
    "        eval_batch_size=40)\n",
    "\n",
    "    run_experiment(hparams)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import experiment\n",
    "experiment.launch(lambda: run_local_experiment(0,0,4,100,0.7),\n",
    "                 name='Chicago taxi training', \n",
    "                 description='TFX Chicago taxi model training with Hopsworks Experiment')"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}